package org.jetbrains.exposed.v1.r2dbc.statements.api

import io.r2dbc.spi.Result
import io.r2dbc.spi.Row
import io.r2dbc.spi.RowMetadata
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.count
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.reactive.collect
import org.jetbrains.exposed.v1.core.IColumnType
import org.jetbrains.exposed.v1.core.InternalApi
import org.jetbrains.exposed.v1.core.statements.api.ResultApi
import org.jetbrains.exposed.v1.core.statements.api.RowApi
import org.jetbrains.exposed.v1.core.transactions.withThreadLocalTransaction
import org.jetbrains.exposed.v1.core.vendors.currentDialect
import org.jetbrains.exposed.v1.r2dbc.mappers.R2dbcTypeMapping
import org.jetbrains.exposed.v1.r2dbc.transactions.TransactionManager
import org.reactivestreams.Publisher
import java.util.*
import kotlin.jvm.optionals.getOrNull

/**
 * Class responsible for wrapping a [Result] generated by executing a statement that queries an R2DBC database.
 *
 * @property resultPublisher The actual [Publisher] of [Result]s returned by the database after statement execution.
 */
class R2dbcResult internal constructor(
    private val resultPublisher: Publisher<out Result>,
    internal val typeMapping: R2dbcTypeMapping
) : ResultApi {
    private var consumed = false

    override fun <T> mapRows(block: (RowApi) -> T?): Flow<T?> {
        if (consumed) error("Result is already consumed")
        consumed = true

        val currentTransaction = TransactionManager.currentOrNull()

        return flow {
            resultPublisher.collect { result ->
                result
                    .map { row, rm ->
                        // The current block is run in another thread outside of coroutine,
                        // so that thread should also get the correct transaction into the thread local variables
                        @OptIn(InternalApi::class)
                        withThreadLocalTransaction(currentTransaction) {
                            Optional.ofNullable(block(R2dbcRow(row, typeMapping)))
                        }
                    }
                    .collect { emit(it.getOrNull()) }
            }
        }
    }

    /**
     * Returns a Flow containing the results of applying the given transform function block to each original
     * [Result.Segment] generated by executing a statement.
     */
    fun <T : Any> mapSegments(block: (Result.Segment) -> Flow<T>): Flow<T> {
        val currentTransaction = TransactionManager.currentOrNull()
        return flow {
            resultPublisher.collect { result ->

                @OptIn(InternalApi::class)
                result.flatMap<T> { segment ->
                    // The current block is run in another thread outside of coroutine,
                    // so that thread should also get the correct transaction into the thread local variables
                    withThreadLocalTransaction(currentTransaction) {
                        val rf = block(segment)
                        rf.asPublisher()
                    }
                }.collect { emit(it) }
            }
        }
    }

    /**
     * Returns a Flow containing the number of rows updated by executing a statement.
     *
     * @throws IllegalStateException If the result has already been consumed by another flow operation.
     */
    fun rowsUpdated(): Flow<Int> = flow {
        resultPublisher.collect { result ->
            result.rowsUpdated.collect { count ->
                emit(count.toInt())
            }
        }
    }

    /**
     * Terminal flow operator that triggers the final collection of the original flow of results,
     * but without transforming or emitting any values.
     */
    suspend fun collect() {
        mapRows { }.collect()
    }

    override fun toString(): String = "R2dbcResult(resultPublisher = $resultPublisher)"

    override fun close() = Unit
}

/**
 * Represents the access methods for retrieving objects from a [Row], by index or column name.
 *
 * @param row The actual underlying wrapped [Row] that is being accessed.
 * @param typeMapping The type mapper logic being used to get values from a [Row].
 */
class R2dbcRow(val row: Row, private val typeMapping: R2dbcTypeMapping) : RowApi {
    override fun getObject(index: Int): Any? {
        return row.get(index - 1)
    }

    override fun getObject(name: String): Any? = row.get(name)

    @Suppress("unchecked_cast")
    override fun <T> getObject(index: Int, type: Class<T>): T? {
        return row.get(index - 1, type) as T
    }

    override fun <T> getObject(name: String, type: Class<T>): T? = row.get(name, type)

    override fun <T> getObject(index: Int, type: Class<T>?, columnType: IColumnType<*>): T? {
        return typeMapping.getValue(row, type, index, currentDialect, columnType)
    }

    override fun getString(index: Int): String? = row.get(index - 1, java.lang.String::class.java)?.toString()
}

/**
 * Returns the count of rows collected from this [ResultApi].
 *
 * Calling this function will fully consume the original flow of results without transforming or emitting them.
 */
suspend fun ResultApi.rowsCount(): Int = mapRows { }.count()

/** Returns the actual underlying [Row] at the current position in this result [RowApi]. */
val RowApi.origin: Row get() = (this as R2dbcRow).row

/** Returns the actual [RowMetadata] for the current row in this result [RowApi]. */
val RowApi.metadata: RowMetadata get() = (this as R2dbcRow).row.metadata
